Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: processor refactor #45

Merged
merged 5 commits into from
Jan 25, 2025
Merged

feat: processor refactor #45

merged 5 commits into from
Jan 25, 2025

Conversation

ponderingdemocritus
Copy link
Contributor

@ponderingdemocritus ponderingdemocritus commented Jan 25, 2025

  • abstracts processor to allow multiple processors to happen. This allows optimal processing of any data type, and extensions by users if they choose.

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced a more flexible processor architecture supporting multiple processors.
    • Added time context categorization for content.
    • Enhanced content identification with unique ID generation.
    • Added methods to retrieve memories from specific rooms.
    • Introduced a new MessageProcessor class for handling messages.
  • Improvements

    • Refactored processor initialization to support more dynamic processing.
    • Updated orchestrator to handle multiple processors.
    • Improved method for determining processor compatibility with content.
    • Enhanced detail in the data returned from mentions input handler.
  • Technical Updates

    • Replaced single Processor with abstract BaseProcessor and MessageProcessor.
    • Modified core processing logic to be more extensible.
    • Updated access levels for certain methods in the vector database.

Copy link
Contributor

coderabbitai bot commented Jan 25, 2025

Walkthrough

This pull request introduces a significant refactoring of the processor architecture in the core system. The changes involve replacing the Processor class with an abstract BaseProcessor and a new MessageProcessor class. Modifications affect multiple files, including example configurations and core utility functions. The orchestrator is updated to manage multiple processors, and new utility functions are added for content identification and time context. Overall, the changes enhance the flexibility and extensibility of the processing mechanism.

Changes

File Change Summary
examples/example-api.ts Replaced Processor with MessageProcessor, updated orchestrator initialization to accept an array of processors.
examples/example-twitter.ts Similar changes to example-api.ts, switched to MessageProcessor, modified mentions handler to return detailed metadata.
packages/core/src/core/orchestrator.ts Major refactoring to support multiple processors, added processContent method, updated internal state management.
packages/core/src/core/processor.ts Introduced BaseProcessor abstract class, added MessageProcessor, modified processing logic and constructor parameters.
packages/core/src/core/utils.ts Added getTimeContext and generateContentId utility functions for time categorization and unique ID generation.
packages/core/src/core/room-manager.ts Added getMemoriesFromRoom, hasProcessedContentInRoom, and markContentAsProcessed methods for room memory management.
packages/core/src/core/vector-db.ts Added getMemoriesFromRoom method to retrieve memories from a specified room's collection, changed access level of certain methods to public.
packages/core/src/core/processors/message-processor.ts Added MessageProcessor class that extends BaseProcessor, implements processing logic for messages.
packages/core/src/core/index.ts Updated imports and exports to reflect changes in processor classes, removed several entities, and added Processors.
packages/core/src/core/processors/index.ts Added export for MessageProcessor from the message-processor module.

Sequence Diagram

sequenceDiagram
    participant O as Orchestrator
    participant P as MessageProcessor
    participant R as Room
    
    O->>P: processContent(content, source)
    P->>P: canHandle(content)
    alt Content can be handled
        P->>R: Process content
        P-->>O: Return ProcessedResult
    else Content cannot be handled
        O-->>O: Log "No suitable processor"
        O-->>O: Return null
    end
Loading

Possibly related PRs

  • fix: Updates #37: The changes in examples/example-api.ts introduce a new implementation of a Twitter bot that modifies the orchestration logic to include a ScheduledTaskMongoDb instance, which is relevant to the changes made in the main PR regarding the example-api.ts file.
  • feat: llm clients #30: The modifications in examples/example-twitter.ts include updates to the LLMClient and Processor, which are also relevant to the changes in the main PR that involve the example-api.ts file.
  • feat: abstract mongo db to allow persistent tasks #42: The introduction of ScheduledTaskMongoDb in the main PR aligns with the changes made in examples/example-api.ts, which also integrates this new functionality for managing scheduled tasks in MongoDB.

Poem

🐰 A Processor's Tale of Transformation
From single class to abstract art,
Processors dance, each playing their part.
BaseProcessor leads, MessageProcessor follows,
Flexibility blooms where rigidity once wallows.
Code evolves, with rabbits' delight! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
packages/core/src/core/processor.ts (1)

Line range hint 140-165: Prompt construction is thorough but watch for potential injection.
If user input is included without sanitization, malicious content might alter your prompt in unexpected ways. Consider sanitizing or bounding user-provided content to prevent injection attempts.

-const contentStr = ...
+const sanitizedContent = sanitizeContent(contentStr);
+const prompt = ...
🧹 Nitpick comments (11)
packages/core/src/core/processor.ts (5)

13-50: Solid approach to an abstract BaseProcessor.
By defining BaseProcessor with canHandle() and process() as abstract methods, you've cleanly enforced a contract for specialized processors. This design fosters extensibility when multiple processors are added. Consider adding inline documentation or test coverage to ensure all subclasses meet the contract consistently.


52-70: MessageProcessor extends BaseProcessor effectively.
The constructor and call to super() are correct. However, consider improving the TODO: fix this placeholder in canHandle() so it doesn’t always return true. If it's truly universal, rename or remove the TODO.


75-109: Consider removing or refactoring the commented-out logic.
Large blocks of commented-out code can confuse future maintainers. If you plan on reviving it, track it in version control or place it behind a feature flag. Otherwise, remove it and rely on source history.


280-289: Robust error handling on LLM processing failures.
Returning a partial ProcessedResult ensures the rest of the system can continue without throwing unhandled exceptions. Consider logging more context if repeated failures occur or add metrics for observability.


298-364: Remove or convert commented-out logic into a plugin/feature-flag system.
Leaving large chunks of commented code can create confusion. Store any transitional features in version control or place behind a toggle if you plan to reintroduce them soon.

packages/core/src/core/orchestrator.ts (3)

49-53: Populate processor map by name while ensuring uniqueness.
Consider validating that the name is non-duplicate or raising an error if a name collision occurs.


71-119: processContent method is well-structured and easy to follow.

  1. It confirms room existence.
  2. It locates the first capable processor.
  3. If none is found, logs a fallback.
    Consider letting the user specify which processor to apply or fallback to a default for more fine-grained control.

Line range hint 304-364: Multi-processor workflow in runAutonomousFlow is well-implemented.
Looping until queue is empty ensures that new content from actions is fully processed. Consider whether you might need concurrency control in high-throughput environments.

examples/example-api.ts (1)

63-63: LGTM! Architecture improvement.

The change to accept an array of processors ([processor]) aligns with the PR objective of enabling multiple processors. This enhancement provides better extensibility and flexibility in processing pipelines.

packages/core/src/core/utils.ts (2)

255-264: Add configuration options for time thresholds.

The function has hardcoded time thresholds. Consider making these configurable to support different use cases.

+interface TimeThresholds {
+  veryRecent: number; // hours
+  recent: number;
+  thisWeek: number;
+  thisMonth: number;
+}
+
+const DEFAULT_TIME_THRESHOLDS: TimeThresholds = {
+  veryRecent: 24,
+  recent: 72,
+  thisWeek: 168,
+  thisMonth: 720,
+};
+
-export function getTimeContext(timestamp: Date): string {
+export function getTimeContext(
+  timestamp: Date,
+  thresholds: TimeThresholds = DEFAULT_TIME_THRESHOLDS
+): string {
   const now = new Date();
   const hoursDiff = (now.getTime() - timestamp.getTime()) / (1000 * 60 * 60);

-  if (hoursDiff < 24) return "very_recent";
-  if (hoursDiff < 72) return "recent";
-  if (hoursDiff < 168) return "this_week";
-  if (hoursDiff < 720) return "this_month";
+  if (hoursDiff < thresholds.veryRecent) return "very_recent";
+  if (hoursDiff < thresholds.recent) return "recent";
+  if (hoursDiff < thresholds.thisWeek) return "this_week";
+  if (hoursDiff < thresholds.thisMonth) return "this_month";
   return "older";
 }

299-303: Consider adding type information for better type safety.

The relevantData object could benefit from explicit type information to prevent potential runtime errors.

+interface RelevantData {
+  content: any;
+  type?: string;
+}
+
-const relevantData = {
+const relevantData: RelevantData = {
   content: item.content || item,
   type: item.type,
 };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1743423 and 2e3b338.

📒 Files selected for processing (5)
  • examples/example-api.ts (3 hunks)
  • examples/example-twitter.ts (3 hunks)
  • packages/core/src/core/orchestrator.ts (6 hunks)
  • packages/core/src/core/processor.ts (4 hunks)
  • packages/core/src/core/utils.ts (1 hunks)
🔇 Additional comments (13)
packages/core/src/core/processor.ts (6)

4-5: Great use of typed imports.
No concerns here; everything is well-aligned with TypeScript best practices.


29-32: Good addition of a self-identifying getName() method.
This method simplifies debugging and ensures each processor can be referenced by name.


34-38: Abstract canHandle method is well-defined.
It correctly enforces explicit capability checks in subclasses.


39-50: Abstract process method sets a clear contract.
It encourages uniform handling of content while letting subclasses implement specialized logic.


122-133: Well-structured schema generation for outputs and actions.
This approach, mapping over the ioContext handlers, is concise. If there are many handlers, watch for potential performance overhead when stringifying large schemas. Otherwise, this is fine.


261-265: Appropriate fallback usage of optional chaining.
This ensures you gracefully handle cases where no outputs are available.

packages/core/src/core/orchestrator.ts (3)

4-4: Good usage of typed imports for new processor types.
Encourages a clear boundary between the orchestrator and its processors.


24-24: Use of Map for processors is well-structured.
This data structure allows for quick lookups by processor name and supports dynamic additions or removals if needed in the future.


44-44: Constructor signature updated to support multiple processors.
This elegantly sets the stage for a genuinely extensible system.

examples/example-twitter.ts (3)

15-15: Replacing Processor import with MessageProcessor is consistent with your new design.
Shows a clear adoption of the extended BaseProcessor.


44-44: Initialization of MessageProcessor with the default character is straightforward.
Consider validating that the model and character traits are compatible to avoid unexpected LLM behavior.


65-65: Passing an array of processors is a future-proof design choice.
Even if there's only one active processor, the array-based approach positions the system for multi-processor scaling.

examples/example-api.ts (1)

42-45: Verify the removal of vectorDb parameter.

The MessageProcessor no longer requires vectorDb. Please confirm that vector database functionality is handled appropriately in the new architecture.

✅ Verification successful

Vector database functionality has been properly decoupled

The removal of vectorDb from MessageProcessor is part of an intentional architectural improvement. The vector database functionality is now handled by a dedicated VectorDB service, which is directly used by components that need it, following better separation of concerns.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if vectorDb is used elsewhere in MessageProcessor or its parent classes
# Look for vectorDb usage in processor-related files
rg -A 5 "vectorDb" packages/core/src/core/processor.ts

Length of output: 746


Script:

#!/bin/bash
# Search for vector database related files and classes
fd "vector|memory" --type f --exec rg -l "class.*(?i)(vector|memory)" {} \;

# Search for new implementations of similar functionality
ast-grep --pattern 'class $_ implements $_ {
  $$$
  findSimilar$_($$$) {
    $$$
  }
  $$$
}'

Length of output: 215


Script:

#!/bin/bash
# Check the vector-db.ts implementation and its usage
cat packages/core/src/core/vector-db.ts

# Look for where vector-db is being used
rg -l "import.*vector-db"

Length of output: 56082

Comment on lines +266 to +361
export function generateContentId(content: any): string {
try {
// 1. Special handling for Twitter mentions/tweets array
if (Array.isArray(content) && content[0]?.type === "tweet") {
// Use the newest tweet's ID as the marker
const newestTweet = content[0];
return `tweet_batch_${newestTweet.metadata.tweetId}`;
}

// 2. Single tweet handling
if (content?.type === "tweet") {
return `tweet_${content.metadata.tweetId}`;
}

// 3. If it's a plain string, fallback to hashing the string but also add a small random/time factor.
// This ensures repeated user messages with the same text won't collapse to the same ID.
if (typeof content === "string") {
// Add a short suffix: e.g. timestamp + small random
const suffix = `${Date.now()}_${Math.random()
.toString(36)
.slice(2, 6)}`;
return `content_${hashString(content)}_${suffix}`;
}

// 4. For arrays (non-tweets), attempt to find known IDs or hash the items
if (Array.isArray(content)) {
const ids = content.map((item) => {
// Check if there's an explicit .id
if (item.id) return item.id;
// Check for item.metadata?.id
if (item.metadata?.id) return item.metadata.id;

// Otherwise, hash the item
const relevantData = {
content: item.content || item,
type: item.type,
};
return hashString(JSON.stringify(relevantData));
});

// Join them, but also add a short suffix so different array orders don't collide
const suffix = `${Date.now()}_${Math.random()
.toString(36)
.slice(2, 6)}`;
return `array_${ids.join("_").slice(0, 100)}_${suffix}`;
}

// 5. For single objects, check .id first
if (content.id) {
return `obj_${content.id}`;
}

// 6. Special handling for "internal_thought" or "consciousness"
if (
content.type === "internal_thought" ||
content.source === "consciousness"
) {
const thoughtData = {
content: content.content,
timestamp: content.timestamp,
};
return `thought_${hashString(JSON.stringify(thoughtData))}`;
}

// 7. Then check if there's a metadata.id
if (content.metadata?.id) {
return `obj_${content.metadata.id}`;
}

// 8. Or any metadata key ending with 'id'
if (content.metadata) {
for (const [key, value] of Object.entries(content.metadata)) {
if (key.toLowerCase().endsWith("id") && value) {
return `obj_${value}`;
}
}
}

// 9. Finally, fallback to hashing the object,
// but add a random/time suffix so repeated content isn't auto-deduplicated.
const relevantData = {
content: content.content || content,
type: content.type,
// Include source if available
...(content.source &&
content.source !== "consciousness" && {
source: content.source,
}),
};
const baseHash = hashString(JSON.stringify(relevantData));
const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
return `obj_${baseHash}_${suffix}`;
} catch (error) {
return `fallback_${Date.now()}`;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor long function into smaller, focused functions.

The generateContentId function is quite long (95+ lines) and handles multiple content types. Consider breaking it down into smaller, specialized functions for better maintainability.

+function generateTweetId(content: any, isBatch: boolean = false): string {
+  if (isBatch) {
+    const newestTweet = content[0];
+    return `tweet_batch_${newestTweet.metadata.tweetId}`;
+  }
+  return `tweet_${content.metadata.tweetId}`;
+}
+
+function generateStringId(content: string): string {
+  const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
+  return `content_${hashString(content)}_${suffix}`;
+}
+
+function generateArrayId(content: any[]): string {
+  const ids = content.map((item) => {
+    if (item.id) return item.id;
+    if (item.metadata?.id) return item.metadata.id;
+    const relevantData = {
+      content: item.content || item,
+      type: item.type,
+    };
+    return hashString(JSON.stringify(relevantData));
+  });
+  const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
+  return `array_${ids.join("_").slice(0, 100)}_${suffix}`;
+}
+
+function generateThoughtId(content: any): string {
+  const thoughtData = {
+    content: content.content,
+    timestamp: content.timestamp,
+  };
+  return `thought_${hashString(JSON.stringify(thoughtData))}`;
+}
+
 export function generateContentId(content: any): string {
   try {
-    // 1. Special handling for Twitter mentions/tweets array
     if (Array.isArray(content) && content[0]?.type === "tweet") {
-      const newestTweet = content[0];
-      return `tweet_batch_${newestTweet.metadata.tweetId}`;
+      return generateTweetId(content, true);
     }
 
-    // 2. Single tweet handling
     if (content?.type === "tweet") {
-      return `tweet_${content.metadata.tweetId}`;
+      return generateTweetId(content);
     }
 
-    // 3. If it's a plain string...
     if (typeof content === "string") {
-      const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
-      return `content_${hashString(content)}_${suffix}`;
+      return generateStringId(content);
     }
 
-    // 4. For arrays (non-tweets)...
     if (Array.isArray(content)) {
-      const ids = content.map((item) => {
-        if (item.id) return item.id;
-        if (item.metadata?.id) return item.metadata.id;
-        const relevantData = {
-          content: item.content || item,
-          type: item.type,
-        };
-        return hashString(JSON.stringify(relevantData));
-      });
-      const suffix = `${Date.now()}_${Math.random().toString(36).slice(2, 6)}`;
-      return `array_${ids.join("_").slice(0, 100)}_${suffix}`;
+      return generateArrayId(content);
     }
 
     if (content.id) {
       return `obj_${content.id}`;
     }
 
-    // 6. Special handling for "internal_thought" or "consciousness"
     if (
       content.type === "internal_thought" ||
       content.source === "consciousness"
     ) {
-      const thoughtData = {
-        content: content.content,
-        timestamp: content.timestamp,
-      };
-      return `thought_${hashString(JSON.stringify(thoughtData))}`;
+      return generateThoughtId(content);
     }
     
     // Rest of the function remains the same...

Also, consider adding JSDoc documentation to improve code maintainability:

/**
 * Generates a unique content identifier based on the content type and properties.
 * 
 * @param content - The content to generate an ID for. Can be a tweet, string, array, or object.
 * @returns A unique string identifier for the content.
 * 
 * @example
 * ```typescript
 * // For a tweet
 * const tweetId = generateContentId({ type: 'tweet', metadata: { tweetId: '123' } });
 * // Returns: 'tweet_123'
 * 
 * // For a string
 * const stringId = generateContentId('hello');
 * // Returns: 'content_<hash>_<timestamp>_<random>'
 * ```
 */

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
packages/core/src/core/processor.ts (1)

Line range hint 74-264: Consider refactoring for better maintainability.

The process method is quite long and handles multiple responsibilities. Consider:

  1. Extract schema construction to separate methods
  2. Move prompt template to a separate file
  3. Extract LLM response handling logic
+private constructOutputSchema(outputs: IOHandler[]): string {
+    return outputs
+        .map((handler) => 
+            `${handler.name}: ${JSON.stringify(zodToJsonSchema(handler.schema, handler.name))}`)
+        .join("\n");
+}

+private constructPrompt(content: any, otherContext: string, ioContext?: {
+    availableOutputs: IOHandler[];
+    availableActions: IOHandler[];
+}): string {
+    const contentStr = typeof content === "string" ? content : JSON.stringify(content);
+    const outputsSchemaPart = ioContext?.availableOutputs 
+        ? this.constructOutputSchema(ioContext.availableOutputs)
+        : "";
+    const actionsSchemaPart = ioContext?.availableActions
+        ? this.constructOutputSchema(ioContext.availableActions)
+        : "";
+    
+    return `Analyze the following content...`; // Move template to separate file
+}

 async process(
     content: any,
     otherContext: string,
     ioContext?: {
         availableOutputs: IOHandler[];
         availableActions: IOHandler[];
     }
 ): Promise<ProcessedResult> {
     this.logger.debug("Processor.process", "Processing content", {
         content,
     });

-    const contentStr = typeof content === "string" ? content : JSON.stringify(content);
-    // ... rest of the implementation
+    const prompt = this.constructPrompt(content, otherContext, ioContext);
🧹 Nitpick comments (6)
packages/core/src/core/room-manager.ts (1)

229-251: LGTM! Consider adding input validation and documentation.

The implementation looks good with proper error handling and data mapping. Consider these minor improvements:

  1. Add JSDoc documentation describing the method's purpose and parameters
  2. Add input validation for negative limit values
+/**
+ * Retrieves memories from a specific room.
+ * @param roomId - The ID of the room to fetch memories from
+ * @param limit - Optional maximum number of memories to retrieve
+ * @returns Promise resolving to an array of Memory objects
+ */
 public async getMemoriesFromRoom(
     roomId: string,
     limit?: number
 ): Promise<Memory[]> {
+    if (limit !== undefined && limit < 0) {
+        throw new Error("Limit cannot be negative");
+    }
     if (!this.vectorDb) {
         throw new Error("VectorDB required for getting memories");
     }
packages/core/src/core/vector-db.ts (1)

1675-1708: LGTM! Consider adding sorting and filtering options.

The implementation is solid with good error handling and logging. Consider these enhancements:

  1. Add sorting options (e.g., by timestamp)
  2. Add metadata filtering capabilities
  3. Add JSDoc documentation
+/**
+ * Gets all memories from a specific room's collection.
+ * @param roomId - The ID of the room to fetch memories from
+ * @param limit - Optional maximum number of memories to retrieve
+ * @param options - Optional parameters for sorting and filtering
+ * @returns Promise resolving to an array of memories with content and metadata
+ */
 public async getMemoriesFromRoom(
     roomId: string,
-    limit?: number
+    limit?: number,
+    options?: {
+        sortBy?: { field: string; order: 'asc' | 'desc' };
+        where?: Record<string, any>;
+    }
 ): Promise<{ content: string; metadata?: Record<string, any> }[]> {
     try {
         const collection = await this.getCollectionForRoom(roomId);

         // Get all documents from the collection, with optional limit
         const results = await collection.get({
             limit,
+            where: options?.where,
             include: ["documents", "metadatas"] as IncludeEnum[],
         });

         if (!results.ids.length) {
             return [];
         }

+        let memories = results.ids.map((_, idx) => ({
+            content: results.documents[idx] || "",
+            metadata: results.metadatas?.[idx] || {},
+        }));
+
+        // Apply sorting if specified
+        if (options?.sortBy) {
+            memories.sort((a, b) => {
+                const aVal = a.metadata?.[options.sortBy!.field];
+                const bVal = b.metadata?.[options.sortBy!.field];
+                return options.sortBy!.order === 'asc' 
+                    ? (aVal > bVal ? 1 : -1)
+                    : (aVal < bVal ? 1 : -1);
+            });
+        }
+
+        return memories;
examples/example-twitter.ts (1)

106-113: LGTM! Consider adding type safety for metadata.

The return structure improvements add valuable context. Consider:

  1. Define a specific type for the mention metadata instead of using any
  2. Update the schema to validate the metadata structure
+interface TwitterMentionMetadata {
+    conversationId: string;
+    tweetId: string;
+    username: string;
+    timestamp: string;
+}

 return mentions.map((mention) => ({
     type: "tweet",
     room: mention.metadata.conversationId,
     messageId: mention.metadata.tweetId,
     user: mention.metadata.username,
     content: mention.content,
-    metadata: mention,
+    metadata: mention as TwitterMentionMetadata,
 }));

 schema: z.object({
     type: z.string(),
     room: z.string(),
     user: z.string(),
     content: z.string(),
-    metadata: z.record(z.any()),
+    metadata: z.object({
+        conversationId: z.string(),
+        tweetId: z.string(),
+        username: z.string(),
+        timestamp: z.string(),
+    }),
 }),

Also applies to: 117-118

packages/core/src/core/processor.ts (1)

12-49: LGTM! Consider enhancing error handling and documentation.

The abstract class provides a good foundation. Consider:

  1. Add JSDoc for abstract methods
  2. Add error handling guidelines for implementations
 export abstract class BaseProcessor {
     protected logger: Logger;

     constructor(
         protected metadata: { name: string; description: string },
         protected loggerLevel: LogLevel = LogLevel.ERROR,
         protected character: Character,
         protected llmClient: LLMClient
     ) {
         this.logger = new Logger({
             level: loggerLevel,
             enableColors: true,
             enableTimestamp: true,
         });
     }

     public getName(): string {
         return this.metadata.name;
     }

+    /**
+     * Determines if this processor can handle the given content.
+     * @param content - The content to check
+     * @returns true if the processor can handle the content, false otherwise
+     * @throws {Error} if content validation fails
+     */
     public abstract canHandle(content: any): boolean;

+    /**
+     * Processes the given content and returns a result.
+     * @param content - The content to process
+     * @param otherContext - Additional context for processing
+     * @param ioContext - Optional context with available handlers
+     * @returns Promise resolving to the processed result
+     * @throws {Error} if processing fails or validation errors occur
+     */
     public abstract process(
         content: any,
         otherContext: string,
         ioContext?: {
             availableOutputs: IOHandler[];
             availableActions: IOHandler[];
         }
     ): Promise<ProcessedResult>;
packages/core/src/core/orchestrator.ts (2)

4-4: Remove unused import MessageProcessor.

The MessageProcessor type is imported but never used in this file.

-import type { BaseProcessor, MessageProcessor } from "./processor";
+import type { BaseProcessor } from "./processor";

254-262: Improve processor selection and error handling.

Two suggestions for improvement:

  1. Extract processor finding logic into a reusable method
  2. Uncomment the warning log for better debugging when no processor is found
-// 2) Find the first processor that can handle it
-const processor = Array.from(this.processors.values()).find((p) =>
-    p.canHandle(data)
-);
+// 2) Find the first processor that can handle it
+const processor = this.findSuitableProcessor(data);

+private findSuitableProcessor(data: unknown): BaseProcessor | undefined {
+    return Array.from(this.processors.values()).find((p) =>
+        p.canHandle(data)
+    );
+}

 if (!processor) {
-    // this.logger.warn("No processor found for data", { source });
+    this.logger.warn("No processor found for data", { source });
     continue;
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2e3b338 and 22417b4.

📒 Files selected for processing (5)
  • examples/example-twitter.ts (4 hunks)
  • packages/core/src/core/orchestrator.ts (6 hunks)
  • packages/core/src/core/processor.ts (3 hunks)
  • packages/core/src/core/room-manager.ts (1 hunks)
  • packages/core/src/core/vector-db.ts (1 hunks)
🔇 Additional comments (1)
examples/example-twitter.ts (1)

44-47: LGTM! Processor initialization updated correctly.

The changes properly implement the new processor architecture:

  1. MessageProcessor instantiated with required parameters
  2. Processor passed as array to Orchestrator

Also applies to: 65-65

Comment on lines 69 to 72
// TODO: fix this
public canHandle(content: any): boolean {
return true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix the canHandle implementation.

The current implementation with return true and a TODO comment needs to be addressed:

  1. It bypasses proper content validation
  2. Could lead to processing invalid content
-    // TODO: fix this
-    public canHandle(content: any): boolean {
-        return true;
-    }
+    public canHandle(content: any): boolean {
+        if (!content) {
+            return false;
+        }
+        
+        // Validate content is string or has string representation
+        return (
+            typeof content === "string" ||
+            (typeof content === "object" && content.toString !== Object.prototype.toString)
+        );
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TODO: fix this
public canHandle(content: any): boolean {
return true;
}
public canHandle(content: any): boolean {
if (!content) {
return false;
}
// Validate content is string or has string representation
return (
typeof content === "string" ||
(typeof content === "object" && content.toString !== Object.prototype.toString)
);
}

Comment on lines 272 to 280
// 4) Save to memory (like you do in processInputTask)
await this.roomManager.addMemory(
room.id,
JSON.stringify(processed.content),
JSON.stringify(processed?.content),
{
source,
type: "input",
...processed.metadata,
...processed.enrichedContext,
...processed?.metadata,
...processed?.enrichedContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential JSON serialization errors.

The JSON.stringify call could throw an error if the content contains circular references or undefined values.

 await this.roomManager.addMemory(
     room.id,
-    JSON.stringify(processed?.content),
+    JSON.stringify(processed?.content, (_, value) => 
+        typeof value === 'undefined' ? null : value
+    ),
     {
         source,
         type: "input",
         ...processed?.metadata,
         ...processed?.enrichedContext,
     }
 );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// 4) Save to memory (like you do in processInputTask)
await this.roomManager.addMemory(
room.id,
JSON.stringify(processed.content),
JSON.stringify(processed?.content),
{
source,
type: "input",
...processed.metadata,
...processed.enrichedContext,
...processed?.metadata,
...processed?.enrichedContext,
// 4) Save to memory (like you do in processInputTask)
await this.roomManager.addMemory(
room.id,
JSON.stringify(processed?.content, (_, value) =>
typeof value === 'undefined' ? null : value
),
{
source,
type: "input",
...processed?.metadata,
...processed?.enrichedContext,

Comment on lines +49 to +53
this.processors = new Map(
processors.map((p) => {
return [p.getName(), p];
})
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for duplicate processor names.

When initializing the processors Map, there's no validation to ensure processor names are unique. If two processors return the same name, the latter will silently override the former.

 this.processors = new Map(
-    processors.map((p) => {
-        return [p.getName(), p];
-    })
+    processors.map((p) => {
+        const name = p.getName();
+        if (this.processors.has(name)) {
+            throw new Error(`Duplicate processor name: ${name}`);
+        }
+        return [name, p];
+    })
 );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
this.processors = new Map(
processors.map((p) => {
return [p.getName(), p];
})
);
const processorNames = new Set();
processors.forEach((p) => {
const name = p.getName();
if (processorNames.has(name)) {
throw new Error(`Duplicate processor name: ${name}`);
}
processorNames.add(name);
});
this.processors = new Map(
processors.map((p) => {
return [p.getName(), p];
})
);

Comment on lines 536 to 586
public async processContent(
content: any,
source: string
): Promise<ProcessedResult | null> {
let memories: Memory[] = [];

if (content.room) {
const room = await this.roomManager.ensureRoom(
content.room,
source
);
memories = await this.roomManager.getMemoriesFromRoom(room.id);

this.logger.debug(
"Orchestrator.processContent",
"Processing content with context",
{
content,
source,
roomId: room.id,
relevantMemories: memories,
}
);
}

const processor = Array.from(this.processors.values()).find((p) =>
p.canHandle(content)
);

if (!processor) {
this.logger.debug(
"Orchestrator.processContent",
"No suitable processor found for content",
{ content }
);
return null;
}

const availableOutputs = Array.from(this.ioHandlers.values()).filter(
(h) => h.role === HandlerRole.OUTPUT
);

const availableActions = Array.from(this.ioHandlers.values()).filter(
(h) => h.role === HandlerRole.ACTION
);

return processor.process(content, JSON.stringify(memories), {
availableOutputs,
availableActions,
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve type safety and performance of processContent method.

Several suggestions for improvement:

  1. Add proper typing for the content parameter
  2. Reuse the processor finding logic
  3. Restructure for early returns
  4. Memoize handler filtering
+private availableOutputHandlers = new Map<HandlerRole, IOHandler[]>();
+
+private getHandlersByRole(role: HandlerRole): IOHandler[] {
+    if (!this.availableOutputHandlers.has(role)) {
+        this.availableOutputHandlers.set(
+            role,
+            Array.from(this.ioHandlers.values()).filter((h) => h.role === role)
+        );
+    }
+    return this.availableOutputHandlers.get(role) || [];
+}
+
-public async processContent(
+public async processContent<T extends Record<string, unknown>>(
-    content: any,
+    content: T,
     source: string
 ): Promise<ProcessedResult | null> {
     let memories: Memory[] = [];
-
-    if (content.room) {
-        const room = await this.roomManager.ensureRoom(
-            content.room,
-            source
-        );
-        memories = await this.roomManager.getMemoriesFromRoom(room.id);
-
-        this.logger.debug(
-            "Orchestrator.processContent",
-            "Processing content with context",
-            {
-                content,
-                source,
-                roomId: room.id,
-                relevantMemories: memories,
-            }
-        );
-    }
-
-    const processor = Array.from(this.processors.values()).find((p) =>
-        p.canHandle(content)
-    );
+    
+    if ('room' in content) {
+        const room = await this.roomManager.ensureRoom(content.room as string, source);
+        memories = await this.roomManager.getMemoriesFromRoom(room.id);
+        
+        this.logger.debug("Orchestrator.processContent", "Processing content with context", {
+            content,
+            source,
+            roomId: room.id,
+            relevantMemories: memories,
+        });
+    }
+    
+    const processor = this.findSuitableProcessor(content);
     
     if (!processor) {
         this.logger.debug(
@@ -569,7 +569,7 @@
         return null;
     }
     
-    const availableOutputs = Array.from(this.ioHandlers.values()).filter(
-        (h) => h.role === HandlerRole.OUTPUT
-    );
-
-    const availableActions = Array.from(this.ioHandlers.values()).filter(
-        (h) => h.role === HandlerRole.ACTION
-    );
+    const availableOutputs = this.getHandlersByRole(HandlerRole.OUTPUT);
+    const availableActions = this.getHandlersByRole(HandlerRole.ACTION);
     
     return processor.process(content, JSON.stringify(memories), {
         availableOutputs,
         availableActions,
     });
 }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (10)
packages/core/src/core/processors/message-processor.ts (3)

12-28: Consider making the processor name configurable.
Right now, the constructor hardcodes "message" as the processor name in its metadata. If you plan to add more message-like processors in the future, you might benefit from passing in a configurable name to avoid confusion.

- name: "message",
+ name: processorName ?? "message",

35-105: Extract prompt-building into a separate method.
The process method constructs a lengthy prompt inline, making it cumbersome to maintain. Consider moving this logic into a dedicated helper inside this class or a utility file.

async process(...) {
-    const prompt = `Analyze the following content...` // many lines
+    const prompt = this.buildPrompt(contentStr, otherContext, ioContext);
     ...
}

private buildPrompt(contentStr: string, otherContext: string, ioContext?: ...): string {
    return `Analyze the following content...`;
}

107-225: Include error details or logs in the fallback object.
Currently, the catch block sets a generic fallback with minimal metadata. Including at least the error message or a truncated stack trace could help downstream debugging.

- return {
-    content,
-    metadata: {},
-    ...
- };
+ return {
+    content,
+    metadata: { error: String(error) },
+    ...
+ };
packages/core/src/core/processor.ts (2)

13-28: Emphasize usage guidelines for metadata.
Currently, metadata is flexible but might need constraints. If future processors rely on certain structure, consider validating or partially typing it within the constructor.


50-68: Use stronger types for the content parameter.
Both canHandle and process accept content: any, which can lead to runtime errors if the data structure differs from expectations. Consider introducing a generic or union type if feasible.

packages/core/src/core/room-manager.ts (2)

253-274: Fix incorrect method name in error log.

The error log message uses markContentAsProcessed instead of hasProcessedContentInRoom.

Apply this diff to fix the error log message:

-                "RoomManager.markContentAsProcessed",
+                "RoomManager.hasProcessedContentInRoom",

276-301: Add return type annotation for better type safety.

The method's return type can be explicitly annotated for better type safety and documentation.

Apply this diff to add the return type annotation:

-    public async markContentAsProcessed(
+    public async markContentAsProcessed(
         contentId: string,
         roomId: string
-    ): Promise<boolean> {
+    ): Promise<boolean> {
packages/core/src/core/orchestrator.ts (2)

241-335: Consider implementing rate limiting for the processing queue.

The queue processing loop could benefit from rate limiting to prevent overwhelming the system when processing large arrays of data.

Apply this diff to add rate limiting:

 while (queue.length > 0) {
     const { data, source } = queue.shift()!;
+    
+    // Add rate limiting
+    if (queue.length > 0) {
+        await new Promise((resolve) => setTimeout(resolve, 100));
+    }

     const processedResults = await this.processContent(data, source);

541-563: Consider extracting the delay value to a configurable constant.

The hardcoded delay value of 5000ms in the array processing loop should be configurable.

Apply this diff to make the delay configurable:

+private static readonly BATCH_PROCESSING_DELAY_MS = 5000;
+
 public async processContent(
     content: any,
     source: string
 ): Promise<ProcessedResult[]> {
     if (Array.isArray(content)) {
         const allResults: ProcessedResult[] = [];
         for (const item of content) {
             // (Optional) Delay to throttle
-            await new Promise((resolve) => setTimeout(resolve, 5000));
+            await new Promise((resolve) => 
+                setTimeout(resolve, Orchestrator.BATCH_PROCESSING_DELAY_MS)
+            );
packages/core/src/core/vector-db.ts (1)

1672-1708: Consider adding sorting options for memory retrieval.

The getMemoriesFromRoom method could benefit from sorting options to allow retrieval of memories in chronological order or by other metadata fields.

Apply this diff to add sorting options:

 public async getMemoriesFromRoom(
     roomId: string,
-    limit?: number
+    limit?: number,
+    sortBy?: { field: string; order: 'asc' | 'desc' }
 ): Promise<{ content: string; metadata?: Record<string, any> }[]> {
     try {
         const collection = await this.getCollectionForRoom(roomId);
+        const orderBy = sortBy ? { [sortBy.field]: sortBy.order } : undefined;

         const results = await collection.get({
             limit,
+            orderBy,
             include: ["documents", "metadatas"] as IncludeEnum[],
         });
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 22417b4 and 2a32366.

📒 Files selected for processing (7)
  • examples/example-api.ts (3 hunks)
  • examples/example-twitter.ts (4 hunks)
  • packages/core/src/core/orchestrator.ts (5 hunks)
  • packages/core/src/core/processor.ts (1 hunks)
  • packages/core/src/core/processors/message-processor.ts (1 hunks)
  • packages/core/src/core/room-manager.ts (1 hunks)
  • packages/core/src/core/vector-db.ts (4 hunks)
🔇 Additional comments (13)
packages/core/src/core/processors/message-processor.ts (1)

30-33: ** Fix the canHandle implementation.**
This comment was raised in a previous review. The current canHandle method always returns true and contains a TODO, which can cause unintended processing of invalid or empty content.

examples/example-twitter.ts (5)

15-15: Well-adapted replacement of the Processor class.
Switching from Processor to MessageProcessor aligns with the new abstract architecture and clarifies handling of message content.


44-44: Processor instantiation looks correct.
Passing llmClient, defaultCharacter, and loglevel appears consistent with the new MessageProcessor constructor signature.


65-65: Multiple processor support is well-utilized.
Providing an array of processors allows for more extensible scenarios, such as adding specialized processors later.


106-113: Potential user data exposure in returned metadata.
Returning the entire mention metadata may leak personally identifiable information if it contains sensitive fields. Ensure you filter or sanitize any fields that shouldn't be exposed downstream.


117-118: Room and user fields are now explicitly required.
This schema change makes sense for better structuring and referencing tweets in conversation contexts.

examples/example-api.ts (3)

14-14: LGTM!

The import statement has been correctly updated to use the new MessageProcessor class from the refactored processor module.


63-63: LGTM!

The orchestrator initialization has been correctly updated to pass the processor as an array, aligning with the new multi-processor architecture.


42-45: Verify the removal of vectorDb parameter.

The processor instantiation has been simplified, but the removal of the vectorDb parameter needs verification to ensure it doesn't affect the functionality.

Run the following script to verify the parameter usage:

✅ Verification successful

The removal of vectorDb parameter is safe

The MessageProcessor class implementation shows no direct dependency on vectorDb. Vector storage operations are properly handled by the RoomManager and Orchestrator components, maintaining clean separation of concerns.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if vectorDb is used in MessageProcessor or its parent classes
# Test: Search for vectorDb usage in processor-related files
ast-grep --pattern $'class $_ {
  $$$
  constructor($params) {
    $$$
    this.vectorDb
    $$$
  }
  $$$
}'

Length of output: 110


Script:

#!/bin/bash
# Find MessageProcessor implementation and check for vectorDb usage
echo "=== Finding MessageProcessor files ==="
fd -e ts -e js MessageProcessor

echo -e "\n=== Checking for vectorDb usage ==="
rg -l "vectorDb" --type ts --type js

echo -e "\n=== Checking MessageProcessor class implementation ==="
ast-grep --pattern 'class MessageProcessor {
  $$$
}'

echo -e "\n=== Checking for vectorDb imports ==="
rg "import.*vectorDb" --type ts --type js

Length of output: 690


Script:

#!/bin/bash
# Show context around vectorDb usage and MessageProcessor implementation
echo "=== VectorDb usage context ==="
rg "vectorDb" -B 2 -A 2 --type ts --type js

echo -e "\n=== MessageProcessor implementation ==="
rg "class MessageProcessor" -B 2 -A 10 --type ts --type js

echo -e "\n=== Checking for vectorDb method calls or property access ==="
ast-grep --pattern 'vectorDb.$_'

Length of output: 10938

packages/core/src/core/room-manager.ts (1)

229-251: LGTM!

The getMemoriesFromRoom method is well-implemented with proper error handling, null checks, and memory object formatting.

packages/core/src/core/orchestrator.ts (2)

49-53: Add validation for duplicate processor names.

When initializing the processors Map, there's no validation to ensure processor names are unique. If two processors return the same name, the latter will silently override the former.


639-647: Handle potential JSON serialization errors.

The JSON.stringify call could throw an error if the content contains circular references or undefined values.

packages/core/src/core/vector-db.ts (1)

Line range hint 1594-1671: LGTM!

The methods have been correctly made public with proper error handling, logging, and documentation.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
packages/core/src/core/index.ts (1)

22-40: Consider maintaining consistent alphabetical ordering of exports.

While most exports follow alphabetical order, there are some inconsistencies. Consider reordering for better maintainability:

export {
    BaseProcessor,
    Chains,
-   ChainOfThought,
    ChromaVectorDB,
    Consciousness,
    defaultCharacter,
    GoalManager,
    IO,
    LLMClient,
    Logger,
    Orchestrator,
    Processors,
    Providers,
    Room,
    RoomManager,
    StepManager,
    TaskScheduler,
    Types,
    Utils,
};
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2a32366 and 3a39f48.

📒 Files selected for processing (2)
  • packages/core/src/core/index.ts (2 hunks)
  • packages/core/src/core/processors/index.ts (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • packages/core/src/core/processors/index.ts
🔇 Additional comments (2)
packages/core/src/core/index.ts (2)

5-5: LGTM! Import changes align with the processor abstraction.

The replacement of Processor with BaseProcessor and addition of the Processors namespace effectively support the new architecture for multiple processor implementations.

Also applies to: 19-19


22-40: Verify removal of unused exports.

The ChainOfThought export appears to be unused based on the AI summary which indicates it was meant to be removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant